-
Notifications
You must be signed in to change notification settings - Fork 3.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add local dynamic filter support in IcebergPageSourceProvder #5719
Conversation
presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/AbstractTestIcebergSmoke.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/test/java/io/prestosql/plugin/iceberg/AbstractTestIcebergSmoke.java
Outdated
Show resolved
Hide resolved
presto-iceberg/src/main/java/io/prestosql/plugin/iceberg/IcebergPageSourceProvider.java
Outdated
Show resolved
Hide resolved
@knwg145 could you please push the code changes for the comments you've marked as resolved ? |
fb6f893
to
e03c88a
Compare
e03c88a
to
5440fb3
Compare
@@ -29,6 +29,7 @@ | |||
import io.prestosql.execution.warnings.WarningCollector; | |||
import io.prestosql.metadata.Metadata; | |||
import io.prestosql.operator.OperatorStats; | |||
import io.prestosql.server.DynamicFilterService; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unused
DistributedQueryRunner runner = (DistributedQueryRunner) getQueryRunner(); | ||
ResultWithQueryId<MaterializedResult> result = runner.executeWithQueryId( | ||
withBroadcastJoin(), | ||
"SELECT * FROM lineitem JOIN orders ON lineitem.orderkey = orders.orderkey"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please use lineitem.suppkey = supplier.suppkey
instead.
In this case the dynamic filter is set to all
because the build side is too big, whereas we want to test the case where there is a dynamic filter but it doesn't filter anything.
@@ -1389,4 +1395,55 @@ private void dropTable(String table) | |||
assertUpdate(session, "DROP TABLE " + table); | |||
assertFalse(getQueryRunner().tableExists(session, table)); | |||
} | |||
|
|||
@Test | |||
public void testLocalDynamicFilterWithEmptyBuildSide() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests rely on fact that probe side will not progress until build side is not ready. Because of that these tests are fragile and might break when we change execution (which we will).
In order to write these tests reliable, Iceberg should support either waiting for local DF or waiting for DF for split generation. Then (in tests) we can enforce presence of DF before table scan happens
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we tackle introduce waiting for split generation like Hive separately and let this one go through with less restrictive assertions ?
Interestingly, TestHiveDistributedJoinQueries#testJoinWithEmptyBuildSide
is not flaky despite not using lazy DF to delay probe side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would prevent optimization like #3957
I'm fine having local DF without testing for now (it's one liner), but please create an issue in prestosql (for blocking on DFs locally) and mention it in code.
Alternatively, we can simply add that blocking because it's pretty straight forward (just pass DF future to IcebergPageSource
and block there)
@@ -167,7 +168,7 @@ public ConnectorPageSource createPageSource( | |||
split.getLength(), | |||
split.getFileFormat(), | |||
regularColumns, | |||
table.getPredicate()); | |||
table.getPredicate().intersect(dynamicFilter.getCurrentPredicate().transform(IcebergColumnHandle.class::cast).simplify(100))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you passed dynamicFilter
to IcebergPageSource
you could easily block on DF in io.prestosql.spi.connector.ConnectorPageSource#isBlocked
. However, this should be behind feature toggle
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems from the code that stripe pruning happens before IcebergPageSource
is created in createDataPageSource -> createOrcPageSource -> reader.createRecordReader -> OrcRecordReader
. So we would miss that even if we block in IcebergPageSource.
I think row group pruning could still be accomplished by blocking on DF in IcebergPageSource if dynamicFilter
is pushed into StripeReader
as well. Not sure if the changes are worth doing though.
Please correct if I'm missing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We could probably create createDataPageSource
in IcebergPageSource
in a lazy way (when DF is ready). This way we don't allocate resources until DF is ready (I'm not sure it's big of an issue though)
@@ -1389,4 +1395,55 @@ private void dropTable(String table) | |||
assertUpdate(session, "DROP TABLE " + table); | |||
assertFalse(getQueryRunner().tableExists(session, table)); | |||
} | |||
|
|||
@Test | |||
public void testLocalDynamicFilterWithEmptyBuildSide() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would prevent optimization like #3957
I'm fine having local DF without testing for now (it's one liner), but please create an issue in prestosql (for blocking on DFs locally) and mention it in code.
Alternatively, we can simply add that blocking because it's pretty straight forward (just pass DF future to IcebergPageSource
and block there)
Superseded by #9538 |
Uses the dynamic filter in IcebergPageSourceProvider to filter results